1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.mockito.Matchers.any;
19  import static org.mockito.Mockito.never;
20  import static org.mockito.Mockito.times;
21  import static org.mockito.Mockito.verify;
22  
23  import java.util.Arrays;
24  
25  import org.junit.Before;
26  import org.junit.Test;
27  import org.mockito.Mock;
28  import org.mockito.MockitoAnnotations;
29  
30  import rx.Observable;
31  import rx.Observer;
32  import rx.Subscriber;
33  import rx.functions.Action1;
34  import rx.functions.Func1;
35  import rx.functions.Func2;
36  import rx.subjects.PublishSubject;
37  
38  public class OnSubscribeGroupJoinTest {
39      @Mock
40      Observer<Object> observer;
41  
42      Func2<Integer, Integer, Integer> add = new Func2<Integer, Integer, Integer>() {
43          @Override
44          public Integer call(Integer t1, Integer t2) {
45              return t1 + t2;
46          }
47      };
48  
49      <T> Func1<Integer, Observable<T>> just(final Observable<T> observable) {
50          return new Func1<Integer, Observable<T>>() {
51              @Override
52              public Observable<T> call(Integer t1) {
53                  return observable;
54              }
55          };
56      }
57  
58      <T, R> Func1<T, Observable<R>> just2(final Observable<R> observable) {
59          return new Func1<T, Observable<R>>() {
60              @Override
61              public Observable<R> call(T t1) {
62                  return observable;
63              }
64          };
65      }
66  
67      Func2<Integer, Observable<Integer>, Observable<Integer>> add2 = new Func2<Integer, Observable<Integer>, Observable<Integer>>() {
68          @Override
69          public Observable<Integer> call(final Integer leftValue, Observable<Integer> rightValues) {
70              return rightValues.map(new Func1<Integer, Integer>() {
71                  @Override
72                  public Integer call(Integer rightValue) {
73                      return add.call(leftValue, rightValue);
74                  }
75              });
76          }
77  
78      };
79  
80      @Before
81      public void before() {
82          MockitoAnnotations.initMocks(this);
83      }
84  
85      @Test
86      public void behaveAsJoin() {
87          PublishSubject<Integer> source1 = PublishSubject.create();
88          PublishSubject<Integer> source2 = PublishSubject.create();
89  
90          Observable<Integer> m = Observable.merge(source1.groupJoin(source2,
91                  just(Observable.never()),
92                  just(Observable.never()), add2));
93  
94          m.subscribe(observer);
95  
96          source1.onNext(1);
97          source1.onNext(2);
98          source1.onNext(4);
99  
100         source2.onNext(16);
101         source2.onNext(32);
102         source2.onNext(64);
103 
104         source1.onCompleted();
105         source2.onCompleted();
106 
107         verify(observer, times(1)).onNext(17);
108         verify(observer, times(1)).onNext(18);
109         verify(observer, times(1)).onNext(20);
110         verify(observer, times(1)).onNext(33);
111         verify(observer, times(1)).onNext(34);
112         verify(observer, times(1)).onNext(36);
113         verify(observer, times(1)).onNext(65);
114         verify(observer, times(1)).onNext(66);
115         verify(observer, times(1)).onNext(68);
116 
117         verify(observer, times(1)).onCompleted(); //Never emitted?
118         verify(observer, never()).onError(any(Throwable.class));
119     }
120 
121     class Person {
122         final int id;
123         final String name;
124 
125         public Person(int id, String name) {
126             this.id = id;
127             this.name = name;
128         }
129     }
130 
131     class PersonFruit {
132         final int personId;
133         final String fruit;
134 
135         public PersonFruit(int personId, String fruit) {
136             this.personId = personId;
137             this.fruit = fruit;
138         }
139     }
140 
141     class PPF {
142         final Person person;
143         final Observable<PersonFruit> fruits;
144 
145         public PPF(Person person, Observable<PersonFruit> fruits) {
146             this.person = person;
147             this.fruits = fruits;
148         }
149     }
150 
151     @Test
152     public void normal1() {
153         Observable<Person> source1 = Observable.from(Arrays.asList(
154                 new Person(1, "Joe"),
155                 new Person(2, "Mike"),
156                 new Person(3, "Charlie")
157                 ));
158 
159         Observable<PersonFruit> source2 = Observable.from(Arrays.asList(
160                 new PersonFruit(1, "Strawberry"),
161                 new PersonFruit(1, "Apple"),
162                 new PersonFruit(3, "Peach")
163                 ));
164 
165         Observable<PPF> q = source1.groupJoin(
166                 source2,
167                 just2(Observable.<Object> never()),
168                 just2(Observable.<Object> never()),
169                 new Func2<Person, Observable<PersonFruit>, PPF>() {
170                     @Override
171                     public PPF call(Person t1, Observable<PersonFruit> t2) {
172                         return new PPF(t1, t2);
173                     }
174                 });
175 
176         q.subscribe(
177                 new Subscriber<PPF>() {
178                     @Override
179                     public void onNext(final PPF ppf) {
180                         ppf.fruits.filter(new Func1<PersonFruit, Boolean>() {
181                             @Override
182                             public Boolean call(PersonFruit t1) {
183                                 return ppf.person.id == t1.personId;
184                             }
185                         }).subscribe(new Action1<PersonFruit>() {
186                             @Override
187                             public void call(PersonFruit t1) {
188                                 observer.onNext(Arrays.asList(ppf.person.name, t1.fruit));
189                             }
190                         });
191                     }
192 
193                     @Override
194                     public void onError(Throwable e) {
195                         observer.onError(e);
196                     }
197 
198                     @Override
199                     public void onCompleted() {
200                         observer.onCompleted();
201                     }
202 
203                 }
204                 );
205 
206         verify(observer, times(1)).onNext(Arrays.asList("Joe", "Strawberry"));
207         verify(observer, times(1)).onNext(Arrays.asList("Joe", "Apple"));
208         verify(observer, times(1)).onNext(Arrays.asList("Charlie", "Peach"));
209 
210         verify(observer, times(1)).onCompleted();
211         verify(observer, never()).onError(any(Throwable.class));
212     }
213 
214     @Test
215     public void leftThrows() {
216         PublishSubject<Integer> source1 = PublishSubject.create();
217         PublishSubject<Integer> source2 = PublishSubject.create();
218 
219         Observable<Observable<Integer>> m = source1.groupJoin(source2,
220                 just(Observable.never()),
221                 just(Observable.never()), add2);
222 
223         m.subscribe(observer);
224 
225         source2.onNext(1);
226         source1.onError(new RuntimeException("Forced failure"));
227 
228         verify(observer, times(1)).onError(any(Throwable.class));
229         verify(observer, never()).onCompleted();
230         verify(observer, never()).onNext(any());
231     }
232 
233     @Test
234     public void rightThrows() {
235         PublishSubject<Integer> source1 = PublishSubject.create();
236         PublishSubject<Integer> source2 = PublishSubject.create();
237 
238         Observable<Observable<Integer>> m = source1.groupJoin(source2,
239                 just(Observable.never()),
240                 just(Observable.never()), add2);
241 
242         m.subscribe(observer);
243 
244         source1.onNext(1);
245         source2.onError(new RuntimeException("Forced failure"));
246 
247         verify(observer, times(1)).onNext(any(Observable.class));
248         verify(observer, times(1)).onError(any(Throwable.class));
249         verify(observer, never()).onCompleted();
250     }
251 
252     @Test
253     public void leftDurationThrows() {
254         PublishSubject<Integer> source1 = PublishSubject.create();
255         PublishSubject<Integer> source2 = PublishSubject.create();
256 
257         Observable<Integer> duration1 = Observable.<Integer> error(new RuntimeException("Forced failure"));
258 
259         Observable<Observable<Integer>> m = source1.groupJoin(source2,
260                 just(duration1),
261                 just(Observable.never()), add2);
262         m.subscribe(observer);
263 
264         source1.onNext(1);
265 
266         verify(observer, times(1)).onError(any(Throwable.class));
267         verify(observer, never()).onCompleted();
268         verify(observer, never()).onNext(any());
269     }
270 
271     @Test
272     public void rightDurationThrows() {
273         PublishSubject<Integer> source1 = PublishSubject.create();
274         PublishSubject<Integer> source2 = PublishSubject.create();
275 
276         Observable<Integer> duration1 = Observable.<Integer> error(new RuntimeException("Forced failure"));
277 
278         Observable<Observable<Integer>> m = source1.groupJoin(source2,
279                 just(Observable.never()),
280                 just(duration1), add2);
281         m.subscribe(observer);
282 
283         source2.onNext(1);
284 
285         verify(observer, times(1)).onError(any(Throwable.class));
286         verify(observer, never()).onCompleted();
287         verify(observer, never()).onNext(any());
288     }
289 
290     @Test
291     public void leftDurationSelectorThrows() {
292         PublishSubject<Integer> source1 = PublishSubject.create();
293         PublishSubject<Integer> source2 = PublishSubject.create();
294 
295         Func1<Integer, Observable<Integer>> fail = new Func1<Integer, Observable<Integer>>() {
296             @Override
297             public Observable<Integer> call(Integer t1) {
298                 throw new RuntimeException("Forced failure");
299             }
300         };
301 
302         Observable<Observable<Integer>> m = source1.groupJoin(source2,
303                 fail,
304                 just(Observable.never()), add2);
305         m.subscribe(observer);
306 
307         source1.onNext(1);
308 
309         verify(observer, times(1)).onError(any(Throwable.class));
310         verify(observer, never()).onCompleted();
311         verify(observer, never()).onNext(any());
312     }
313 
314     @Test
315     public void rightDurationSelectorThrows() {
316         PublishSubject<Integer> source1 = PublishSubject.create();
317         PublishSubject<Integer> source2 = PublishSubject.create();
318 
319         Func1<Integer, Observable<Integer>> fail = new Func1<Integer, Observable<Integer>>() {
320             @Override
321             public Observable<Integer> call(Integer t1) {
322                 throw new RuntimeException("Forced failure");
323             }
324         };
325 
326         Observable<Observable<Integer>> m = source1.groupJoin(source2,
327                 just(Observable.never()),
328                 fail, add2);
329         m.subscribe(observer);
330 
331         source2.onNext(1);
332 
333         verify(observer, times(1)).onError(any(Throwable.class));
334         verify(observer, never()).onCompleted();
335         verify(observer, never()).onNext(any());
336     }
337 
338     @Test
339     public void resultSelectorThrows() {
340         PublishSubject<Integer> source1 = PublishSubject.create();
341         PublishSubject<Integer> source2 = PublishSubject.create();
342 
343         Func2<Integer, Observable<Integer>, Integer> fail = new Func2<Integer, Observable<Integer>, Integer>() {
344             @Override
345             public Integer call(Integer t1, Observable<Integer> t2) {
346                 throw new RuntimeException("Forced failure");
347             }
348         };
349 
350         Observable<Integer> m = source1.groupJoin(source2,
351                 just(Observable.never()),
352                 just(Observable.never()), fail);
353         m.subscribe(observer);
354 
355         source1.onNext(1);
356         source2.onNext(2);
357 
358         verify(observer, times(1)).onError(any(Throwable.class));
359         verify(observer, never()).onCompleted();
360         verify(observer, never()).onNext(any());
361     }
362 }